Spring Cloud Alibaba集成ElasticAPM实战

Spring Cloud Alibaba集成ElasticAPM实战

在微服务大行其道的今天,Spring Cloud Alibaba作为优秀的微服务实现,却不能很容易的集成ElasticAPM。本文就将解决的思路和实现,呈现给大家,希望能帮助大家。

前言

继上一篇ElasticAPM初体验我们知道了什么是可观察性,并领略了ElasticAPM的强大功能,但是仅仅是上篇文章中单机模式的使用时远远不够的。还记得上一篇最后提出的两个问题:

1、本文在单机版的环境中,测试通过,但是在分布式环境中,请求会串联起很多应用,那服务跟踪能否实现?实现的原理是什么?

2、Elastic APM可以自动采集http请求,在PRC分布式环境中,Elastic APM能否正常工作?是否必须采用 public API来实现?

重点是分布式RPC,即在分布式情况下,ElasticAPM能否良好工作?在RPC环境下,ElasticAPM是不是也能正常工作呢?

先说答案:在默认情况下,ElasticAPM能够支持分布式的Http方式调用,但是不支持RPC协议。但是很多公司都采用RPC协议作为其内部系统的通信协议,比如我司就采用Spring Cloud Alibaba作为搜索服务的框架,框架内应用的通信是借助RPC框架Dubbo来实现的。所以问题就变成了如何把ElasticAPM集成进Spring Cloud Alibaba中。

架构讲解&问题分析

首先,我先大概图示下Spring Cloud Alibaba和ElasticAPM的架构和工作流程。

如架构图所示,搜索系统分为了网关应用(Gateway)US应用AS应用BS应用,用户的请求会先到达网关,网关会把请求,以Http协议转发给US应用,US应用会采用Dubbo协议调用AS应用,AS应用采用Dubbo协议调用BS应用。

Requesthttp—>USRPC—->AS—–RPC——>BS

每一个应用启动的时候都已经集成了Apm-agent(如果不知道怎么集成请参考ElasticAPM初体验),如果APM-agent默认支持Dubbo就完美了(但是并没有)。所以整个链路追踪,到了US之后,就没有上报之后应用的锚点数据。在查看ElasticAPM官方文档的时候,我注意到了Public API,文档中交代了这样一件事情:

The public API of the Elastic APM Java agent lets you customize and manually create spans and transactions, as well as track errors.

没错,你可以自定义SpanTransaction,如果不懂什么是SpanTransaction请参考ElasticAPM初体验或直接读一遍官方文档。既然Agent默认不支持Dubbo,那么我们使用Public API来实现功能。

搜索系统集成APM架构图

设计思想

基于Spring Cloud Alibaba的架构,我们可以如下图方式实现。

  • 首先用户的请求一定要经过微服务网关,在网关的过滤器中,首先埋入父级Transaction
  • 请求经过网关,会被网关转发到第一层应用中,注意这次转发是http请求,如果是用SpringMVC实现的话,需要在Controller处,上报子Transaction
  • 请求被第一层应用处理之后,下层的应用全部是Dubbo协议的。这时可以采用Dubbo的过滤器机制,对ConcumerProvider都进行拦截,通过这种方式做到不侵入业务代码。
  • 最终,请求返回到微服务网关,调用transaction.end()上报根Transaction
  • 所有流程完毕。

Spring Cloud Alibaba链路追踪流程图

核心实现讲解

微服务网关

微服务网关需要做这样几件事情:

  • 开启根Transaction
  • POST请求body中增加追踪ID,GET请求Parameter中增加追踪ID
  • 在请求返回之后调用transaction.end()完成上报
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
HttpMethod httpMethod = exchange.getRequest().getMethod();
//第一步 开启一个Transaction
Transaction transaction = ElasticApm.startTransaction();
transaction.setName("mainSearch");
transaction.setType(Transaction.TYPE_REQUEST);
//第二步 创建Span
Span span = transaction.startSpan("gateway", "filter", "gateway action");
span.setName("com.mfw.search.gateway.filter.PostBodyCacheFilter#filter");

LOGGER.info("APM埋点成功transactionId:{}", transaction.getId());
//第三步 判定Http请求是POST还是GET
if (HttpMethod.POST.equals(httpMethod)) {
ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders);
MediaType mediaType = exchange.getRequest().getHeaders().getContentType();
//第四步 定义Http body的处理逻辑
Mono<String> modifiedBody = serverRequest.bodyToMono(String.class).flatMap(body -> { //判定body类型
if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) {

//重要!获取到了body的数据,传给callback函数,做业务逻辑处理
Map<String, String> bodyMap = decodeBody(body);
//设置最新的bodyMap进入exchange
exchange.getAttributes().put(GatewayConstant.CACHE_POST_BODY, bodyMap);
//重点!动态增加body的transaction标记,为下游应用Controller使用
span.injectTraceHeaders((name, value) -> {
bodyMap.put(name, value);
LOGGER.info("APM埋点 key:{}, transactionId:{}", name, value);
});
//不要忘记span.end()否则会丢失上报
span.end();
return Mono.just(encodeBody(bodyMap));
} else if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
// origin body map
Map<String, String> bodyMap = decodeJsonBody(body);
exchange.getAttributes().put(GatewayConstant.CACHE_POST_BODY, bodyMap);
//重点!动态增加body的transaction标记,为下游应用Controller使用
span.injectTraceHeaders((name, value) -> {
bodyMap.put(name, value);
LOGGER.info("APM埋点 key:{}, transactionId:{}", name, value);
});
span.end();
return Mono.just(encodeJsonBody(bodyMap));
}
return Mono.empty();
});

BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class);
HttpHeaders headers = new HttpHeaders();
headers.putAll(exchange.getRequest().getHeaders());

// the new content type will be computed by bodyInserter
// and then set in the request decorator
headers.remove(HttpHeaders.CONTENT_LENGTH);

CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {
ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(exchange.getRequest()) {

public HttpHeaders getHeaders() {
long contentLength = headers.getContentLength();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(super.getHeaders());
if (contentLength > 0) {
httpHeaders.setContentLength(contentLength);
} else {
httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
}
return httpHeaders;
}

public Flux<DataBuffer> getBody() {
return outputMessage.getBody();
}
};
//第五步 在请求返回之后,上报transaction
return chain.filter(exchange.mutate().request(decorator).build()).then(Mono.fromRunnable(() -> transaction.end()));
}));
} else if (HttpMethod.GET.equals(httpMethod)) {
span.injectTraceHeaders((name, value) -> {
exchange.getRequest().getQueryParams().set(name, transaction.getId());
LOGGER.info("APM埋点 key:{}, transactionId:{}", name, value);
});
return chain.filter(exchange).then(Mono.fromRunnable(() -> {
span.end();
transaction.end();
LOGGER.info("APM买点完成,transactionId:{}", transaction.getId());
}));
} else {
//not support other Http Method
exchange.getResponse().setStatusCode(HttpStatus.UNSUPPORTED_MEDIA_TYPE);
return exchange.getResponse().setComplete();
}

}

Controller

Controller层的实现采用了SpringAOP方式实现,这样的好处是对业务代码不侵入,可扩展性高,对想要监控的方法直接配置上@TransactionWithRemoteParent()即可。

如下代码是通过@TransactionWithRemoteParent()实现对Controller方法的上报。

1
2
3
4
5
6
@PostMapping(value = "/search", consumes = "application/json", produces = "application/json")
@TransactionWithRemoteParent()
public String searchForm(@RequestBody String req) {
String result = asService.helloAs(req);
return result;
}

AOP实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Aspect
public class ApmAspect {

private static final Logger LOGGER = LoggerFactory.getLogger(ApmAspect.class);

@PostConstruct
private void init() {
LOGGER.info("ApmAspect加载完毕");
}


@Pointcut(value = "@annotation(transactionWithRemoteParent)", argNames = "transactionWithRemoteParent")
public void pointcut(TransactionWithRemoteParent transactionWithRemoteParent) {

}

@Around(value = "pointcut(transactionWithRemoteParent)", argNames = "joinPoint,transactionWithRemoteParent")
public Object around(ProceedingJoinPoint joinPoint, TransactionWithRemoteParent transactionWithRemoteParent) throws Throwable {
Transaction transaction = null;
try {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
transaction = ElasticApm.startTransactionWithRemoteParent(key -> {
String httpRequest = (String) joinPoint.getArgs()[0];
JSONObject json = JSON.parseObject(httpRequest);
String traceId = json.getString(key);
LOGGER.info("切面添加了子Transaction,key={},value={}", key, traceId);
RpcContext.getContext().setAttachment(key, traceId);
return traceId;
});
transaction.setName(StringUtils.isNotBlank(transactionWithRemoteParent.name())
? transactionWithRemoteParent.name() : signature.getName());
transaction.setType(Transaction.TYPE_REQUEST);
return joinPoint.proceed();
} catch (Throwable throwable) {
if (transaction != null) {
transaction.captureException(throwable);
}
throw throwable;
} finally {
if (transaction != null) {
LOGGER.info("切面执行完毕,上报Transaction:{}", transaction.getId());
transaction.end();
}
}
}
}

Dubbo过滤器

如下代码是DubboConsumer过滤器,专门用于处理APM。DubboProvider的实现类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Activate(group = "consumer")
public class DubboConsumerApmFilter implements Filter {

private static final Logger LOGGER = LoggerFactory.getLogger(DubboConsumerApmFilter.class);

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
Transaction transaction = ElasticApm.startTransactionWithRemoteParent(key -> {
String traceId = invocation.getAttachment(key);
LOGGER.info("key={},value={}", key, traceId);
return traceId;
});
try (final Scope scope = transaction.activate()) {
String name = "consumer:" + invocation.getInvoker().getInterface().getName() + "#" + invocation.getMethodName();
transaction.setName(name);
transaction.setType(Transaction.TYPE_REQUEST);

Result result = invoker.invoke(invocation);

return result;
} catch (Exception e) {
transaction.captureException(e);
throw e;
} finally {
transaction.end();
}
}
}

@Activate(group = "provider")
public class DubboProviderApmFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// use startTransactionWithRemoteParent to create transaction with parent, which id from prc context
Transaction transaction = ElasticApm.startTransactionWithRemoteParent(key -> invocation.getAttachment(key));

try (final Scope scope = transaction.activate()) {
String name = "provider:" + invocation.getInvoker().getInterface().getName() + "#" + invocation.getMethodName();
transaction.setName(name);
transaction.setType(Transaction.TYPE_REQUEST);
return invoker.invoke(invocation);
} catch (Exception e) {
transaction.captureException(e);
throw e;
} finally {
transaction.end();
}
}
}

效果

链路追踪效果

源代码

https://github.com/siyuanWang/springCloudAlibabaAPMDemo

参考文档

ElasticAPM集成Dubbo的讨论

评论